{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import dask.distributed\n",
    "import dask.dataframe as dd\n",
    "import pandas as pd\n",
    "import numpy as np"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import geopandas\n",
    "from shapely.geometry import Point"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "client = dask.distributed.Client()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def assign_taxi_zones(df, lon_var, lat_var, locid_var):\n",
    "    \"\"\"Joins DataFrame with Taxi Zones shapefile.\n",
    "\n",
    "    This function takes longitude values provided by `lon_var`, and latitude\n",
    "    values provided by `lat_var` in DataFrame `df`, and performs a spatial join\n",
    "    with the NYC taxi_zones shapefile. \n",
    "\n",
    "    The shapefile is hard coded in, as this function makes a hard assumption of\n",
    "    latitude and longitude coordinates. It also assumes latitude=0 and \n",
    "    longitude=0 is not a datapoint that can exist in your dataset. Which is \n",
    "    reasonable for a dataset of New York, but bad for a global dataset.\n",
    "\n",
    "    Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,\n",
    "    and `df.locid_var` is set to np.nan are updated. \n",
    "\n",
    "    Parameters\n",
    "    ----------\n",
    "    df : pandas.DataFrame or dask.DataFrame\n",
    "        DataFrame containing latitudes, longitudes, and location_id columns.\n",
    "    lon_var : string\n",
    "        Name of column in `df` containing longitude values. Invalid values \n",
    "        should be np.nan.\n",
    "    lat_var : string\n",
    "        Name of column in `df` containing latitude values. Invalid values \n",
    "        should be np.nan\n",
    "    locid_var : string\n",
    "        Name of column in `df` containing taxi_zone location ids. Rows with\n",
    "        valid, nonzero values are not overwritten. \n",
    "    \"\"\"\n",
    "\n",
    "    localdf = df[[lon_var, lat_var, locid_var]].copy()\n",
    "    # localdf = localdf.reset_index()\n",
    "    localdf[lon_var] = localdf[lon_var].fillna(value=0.)\n",
    "    localdf[lat_var] = localdf[lat_var].fillna(value=0.)\n",
    "    localdf['replace_locid'] = (localdf[locid_var].isnull()\n",
    "                                & (localdf[lon_var] != 0.)\n",
    "                                & (localdf[lat_var] != 0.))\n",
    "\n",
    "    if (np.any(localdf['replace_locid'])):\n",
    "        shape_df = geopandas.read_file('../shapefiles/taxi_zones_latlon.shp')\n",
    "        shape_df.drop(['OBJECTID', \"Shape_Area\", \"Shape_Leng\", \"borough\", \"zone\"],\n",
    "                      axis=1, inplace=True)\n",
    "\n",
    "        try:\n",
    "            local_gdf = geopandas.GeoDataFrame(\n",
    "                localdf, crs={'init': 'epsg:4326'},\n",
    "                geometry=[Point(xy) for xy in\n",
    "                          zip(localdf[lon_var], localdf[lat_var])])\n",
    "\n",
    "            local_gdf = geopandas.sjoin(\n",
    "                local_gdf, shape_df, how='left', op='intersects')\n",
    "\n",
    "            # one point can intersect more than one zone -- for example if on\n",
    "            # the boundary between two zones. Deduplicate by taking first valid.\n",
    "            local_gdf = local_gdf[~local_gdf.index.duplicated(keep='first')]\n",
    "\n",
    "            local_gdf.LocationID.values[~local_gdf.replace_locid] = (\n",
    "                (local_gdf[locid_var])[~local_gdf.replace_locid]).values\n",
    "\n",
    "            return local_gdf.LocationID.rename(locid_var)\n",
    "        except ValueError as ve:\n",
    "            print(ve)\n",
    "            print(ve.stacktrace())\n",
    "            return df[locid_var]\n",
    "    else:\n",
    "        return df[locid_var]\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df1 = dd.read_parquet('/bigdata/citibike.parquet')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df1['start_taxizone_id'] = np.nan\n",
    "df1['end_taxizone_id'] = np.nan"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "36902025"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df1.start_station_id.count().compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>trip_duration</th>\n",
       "      <th>start_time</th>\n",
       "      <th>stop_time</th>\n",
       "      <th>start_station_id</th>\n",
       "      <th>start_station_name</th>\n",
       "      <th>start_station_latitude</th>\n",
       "      <th>start_station_longitude</th>\n",
       "      <th>end_station_id</th>\n",
       "      <th>end_station_name</th>\n",
       "      <th>end_station_latitude</th>\n",
       "      <th>end_station_longitude</th>\n",
       "      <th>bike_id</th>\n",
       "      <th>user_type</th>\n",
       "      <th>birth_year</th>\n",
       "      <th>gender</th>\n",
       "      <th>start_taxizone_id</th>\n",
       "      <th>end_taxizone_id</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>634</td>\n",
       "      <td>2013-07-01 00:00:00</td>\n",
       "      <td>2013-07-01 00:10:34</td>\n",
       "      <td>164</td>\n",
       "      <td>E 47 St &amp; 2 Ave</td>\n",
       "      <td>40.753231</td>\n",
       "      <td>-73.970322</td>\n",
       "      <td>504</td>\n",
       "      <td>1 Ave &amp; E 15 St</td>\n",
       "      <td>40.732220</td>\n",
       "      <td>-73.981659</td>\n",
       "      <td>16950</td>\n",
       "      <td>Customer</td>\n",
       "      <td>NaN</td>\n",
       "      <td>0</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>1547</td>\n",
       "      <td>2013-07-01 00:00:02</td>\n",
       "      <td>2013-07-01 00:25:49</td>\n",
       "      <td>388</td>\n",
       "      <td>W 26 St &amp; 10 Ave</td>\n",
       "      <td>40.749718</td>\n",
       "      <td>-74.002953</td>\n",
       "      <td>459</td>\n",
       "      <td>W 20 St &amp; 11 Ave</td>\n",
       "      <td>40.746746</td>\n",
       "      <td>-74.007759</td>\n",
       "      <td>19816</td>\n",
       "      <td>Customer</td>\n",
       "      <td>NaN</td>\n",
       "      <td>0</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>178</td>\n",
       "      <td>2013-07-01 00:01:04</td>\n",
       "      <td>2013-07-01 00:04:02</td>\n",
       "      <td>293</td>\n",
       "      <td>Lafayette St &amp; E 8 St</td>\n",
       "      <td>40.730286</td>\n",
       "      <td>-73.990768</td>\n",
       "      <td>237</td>\n",
       "      <td>E 11 St &amp; 2 Ave</td>\n",
       "      <td>40.730473</td>\n",
       "      <td>-73.986725</td>\n",
       "      <td>14548</td>\n",
       "      <td>Subscriber</td>\n",
       "      <td>1980.0</td>\n",
       "      <td>2</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>1580</td>\n",
       "      <td>2013-07-01 00:01:06</td>\n",
       "      <td>2013-07-01 00:27:26</td>\n",
       "      <td>531</td>\n",
       "      <td>Forsyth St &amp; Broome St</td>\n",
       "      <td>40.718941</td>\n",
       "      <td>-73.992661</td>\n",
       "      <td>499</td>\n",
       "      <td>Broadway &amp; W 60 St</td>\n",
       "      <td>40.769154</td>\n",
       "      <td>-73.981918</td>\n",
       "      <td>16063</td>\n",
       "      <td>Customer</td>\n",
       "      <td>NaN</td>\n",
       "      <td>0</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>757</td>\n",
       "      <td>2013-07-01 00:01:10</td>\n",
       "      <td>2013-07-01 00:13:47</td>\n",
       "      <td>382</td>\n",
       "      <td>University Pl &amp; E 14 St</td>\n",
       "      <td>40.734928</td>\n",
       "      <td>-73.992004</td>\n",
       "      <td>410</td>\n",
       "      <td>Suffolk St &amp; Stanton St</td>\n",
       "      <td>40.720665</td>\n",
       "      <td>-73.985176</td>\n",
       "      <td>19213</td>\n",
       "      <td>Subscriber</td>\n",
       "      <td>1986.0</td>\n",
       "      <td>1</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   trip_duration          start_time           stop_time  start_station_id  \\\n",
       "0            634 2013-07-01 00:00:00 2013-07-01 00:10:34               164   \n",
       "1           1547 2013-07-01 00:00:02 2013-07-01 00:25:49               388   \n",
       "2            178 2013-07-01 00:01:04 2013-07-01 00:04:02               293   \n",
       "3           1580 2013-07-01 00:01:06 2013-07-01 00:27:26               531   \n",
       "4            757 2013-07-01 00:01:10 2013-07-01 00:13:47               382   \n",
       "\n",
       "        start_station_name  start_station_latitude  start_station_longitude  \\\n",
       "0          E 47 St & 2 Ave               40.753231               -73.970322   \n",
       "1         W 26 St & 10 Ave               40.749718               -74.002953   \n",
       "2    Lafayette St & E 8 St               40.730286               -73.990768   \n",
       "3   Forsyth St & Broome St               40.718941               -73.992661   \n",
       "4  University Pl & E 14 St               40.734928               -73.992004   \n",
       "\n",
       "   end_station_id         end_station_name  end_station_latitude  \\\n",
       "0             504          1 Ave & E 15 St             40.732220   \n",
       "1             459         W 20 St & 11 Ave             40.746746   \n",
       "2             237          E 11 St & 2 Ave             40.730473   \n",
       "3             499       Broadway & W 60 St             40.769154   \n",
       "4             410  Suffolk St & Stanton St             40.720665   \n",
       "\n",
       "   end_station_longitude  bike_id   user_type  birth_year  gender  \\\n",
       "0             -73.981659    16950    Customer         NaN       0   \n",
       "1             -74.007759    19816    Customer         NaN       0   \n",
       "2             -73.986725    14548  Subscriber      1980.0       2   \n",
       "3             -73.981918    16063    Customer         NaN       0   \n",
       "4             -73.985176    19213  Subscriber      1986.0       1   \n",
       "\n",
       "   start_taxizone_id  end_taxizone_id  \n",
       "0                NaN              NaN  \n",
       "1                NaN              NaN  \n",
       "2                NaN              NaN  \n",
       "3                NaN              NaN  \n",
       "4                NaN              NaN  "
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df1.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df1['start_taxizone_id'] = df1.map_partitions(\n",
    "    assign_taxi_zones, \"start_station_longitude\", \"start_station_latitude\",\n",
    "    \"start_taxizone_id\", meta=('start_taxizone_id', np.float64))\n",
    "df1['end_taxizone_id'] = df1.map_partitions(\n",
    "    assign_taxi_zones, \"end_station_longitude\", \"end_station_latitude\",\n",
    "    \"end_taxizone_id\", meta=('end_taxizone_id', np.float64))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>trip_duration</th>\n",
       "      <th>start_time</th>\n",
       "      <th>stop_time</th>\n",
       "      <th>start_station_id</th>\n",
       "      <th>start_station_name</th>\n",
       "      <th>start_station_latitude</th>\n",
       "      <th>start_station_longitude</th>\n",
       "      <th>end_station_id</th>\n",
       "      <th>end_station_name</th>\n",
       "      <th>end_station_latitude</th>\n",
       "      <th>end_station_longitude</th>\n",
       "      <th>bike_id</th>\n",
       "      <th>user_type</th>\n",
       "      <th>birth_year</th>\n",
       "      <th>gender</th>\n",
       "      <th>start_taxizone_id</th>\n",
       "      <th>end_taxizone_id</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>634</td>\n",
       "      <td>2013-07-01 00:00:00</td>\n",
       "      <td>2013-07-01 00:10:34</td>\n",
       "      <td>164</td>\n",
       "      <td>E 47 St &amp; 2 Ave</td>\n",
       "      <td>40.753231</td>\n",
       "      <td>-73.970322</td>\n",
       "      <td>504</td>\n",
       "      <td>1 Ave &amp; E 15 St</td>\n",
       "      <td>40.732220</td>\n",
       "      <td>-73.981659</td>\n",
       "      <td>16950</td>\n",
       "      <td>Customer</td>\n",
       "      <td>NaN</td>\n",
       "      <td>0</td>\n",
       "      <td>233</td>\n",
       "      <td>224</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>1547</td>\n",
       "      <td>2013-07-01 00:00:02</td>\n",
       "      <td>2013-07-01 00:25:49</td>\n",
       "      <td>388</td>\n",
       "      <td>W 26 St &amp; 10 Ave</td>\n",
       "      <td>40.749718</td>\n",
       "      <td>-74.002953</td>\n",
       "      <td>459</td>\n",
       "      <td>W 20 St &amp; 11 Ave</td>\n",
       "      <td>40.746746</td>\n",
       "      <td>-74.007759</td>\n",
       "      <td>19816</td>\n",
       "      <td>Customer</td>\n",
       "      <td>NaN</td>\n",
       "      <td>0</td>\n",
       "      <td>246</td>\n",
       "      <td>246</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>178</td>\n",
       "      <td>2013-07-01 00:01:04</td>\n",
       "      <td>2013-07-01 00:04:02</td>\n",
       "      <td>293</td>\n",
       "      <td>Lafayette St &amp; E 8 St</td>\n",
       "      <td>40.730286</td>\n",
       "      <td>-73.990768</td>\n",
       "      <td>237</td>\n",
       "      <td>E 11 St &amp; 2 Ave</td>\n",
       "      <td>40.730473</td>\n",
       "      <td>-73.986725</td>\n",
       "      <td>14548</td>\n",
       "      <td>Subscriber</td>\n",
       "      <td>1980.0</td>\n",
       "      <td>2</td>\n",
       "      <td>113</td>\n",
       "      <td>79</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>1580</td>\n",
       "      <td>2013-07-01 00:01:06</td>\n",
       "      <td>2013-07-01 00:27:26</td>\n",
       "      <td>531</td>\n",
       "      <td>Forsyth St &amp; Broome St</td>\n",
       "      <td>40.718941</td>\n",
       "      <td>-73.992661</td>\n",
       "      <td>499</td>\n",
       "      <td>Broadway &amp; W 60 St</td>\n",
       "      <td>40.769154</td>\n",
       "      <td>-73.981918</td>\n",
       "      <td>16063</td>\n",
       "      <td>Customer</td>\n",
       "      <td>NaN</td>\n",
       "      <td>0</td>\n",
       "      <td>148</td>\n",
       "      <td>142</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>757</td>\n",
       "      <td>2013-07-01 00:01:10</td>\n",
       "      <td>2013-07-01 00:13:47</td>\n",
       "      <td>382</td>\n",
       "      <td>University Pl &amp; E 14 St</td>\n",
       "      <td>40.734928</td>\n",
       "      <td>-73.992004</td>\n",
       "      <td>410</td>\n",
       "      <td>Suffolk St &amp; Stanton St</td>\n",
       "      <td>40.720665</td>\n",
       "      <td>-73.985176</td>\n",
       "      <td>19213</td>\n",
       "      <td>Subscriber</td>\n",
       "      <td>1986.0</td>\n",
       "      <td>1</td>\n",
       "      <td>113</td>\n",
       "      <td>148</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   trip_duration          start_time           stop_time  start_station_id  \\\n",
       "0            634 2013-07-01 00:00:00 2013-07-01 00:10:34               164   \n",
       "1           1547 2013-07-01 00:00:02 2013-07-01 00:25:49               388   \n",
       "2            178 2013-07-01 00:01:04 2013-07-01 00:04:02               293   \n",
       "3           1580 2013-07-01 00:01:06 2013-07-01 00:27:26               531   \n",
       "4            757 2013-07-01 00:01:10 2013-07-01 00:13:47               382   \n",
       "\n",
       "        start_station_name  start_station_latitude  start_station_longitude  \\\n",
       "0          E 47 St & 2 Ave               40.753231               -73.970322   \n",
       "1         W 26 St & 10 Ave               40.749718               -74.002953   \n",
       "2    Lafayette St & E 8 St               40.730286               -73.990768   \n",
       "3   Forsyth St & Broome St               40.718941               -73.992661   \n",
       "4  University Pl & E 14 St               40.734928               -73.992004   \n",
       "\n",
       "   end_station_id         end_station_name  end_station_latitude  \\\n",
       "0             504          1 Ave & E 15 St             40.732220   \n",
       "1             459         W 20 St & 11 Ave             40.746746   \n",
       "2             237          E 11 St & 2 Ave             40.730473   \n",
       "3             499       Broadway & W 60 St             40.769154   \n",
       "4             410  Suffolk St & Stanton St             40.720665   \n",
       "\n",
       "   end_station_longitude  bike_id   user_type  birth_year  gender  \\\n",
       "0             -73.981659    16950    Customer         NaN       0   \n",
       "1             -74.007759    19816    Customer         NaN       0   \n",
       "2             -73.986725    14548  Subscriber      1980.0       2   \n",
       "3             -73.981918    16063    Customer         NaN       0   \n",
       "4             -73.985176    19213  Subscriber      1986.0       1   \n",
       "\n",
       "   start_taxizone_id  end_taxizone_id  \n",
       "0                233              224  \n",
       "1                246              246  \n",
       "2                113               79  \n",
       "3                148              142  \n",
       "4                113              148  "
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df1.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df1.to_parquet('/bigdata/citibike_locid.parquet', has_nulls=True, compression=\"SNAPPY\",\n",
    "              object_encoding='json')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df1 = dd.read_parquet('/bigdata/citibike_locid.parquet')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "36902025"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df1.start_station_id.count().compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df2 = df1[df1.start_taxizone_id.notnull() & df1.end_taxizone_id.notnull()]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "36901697"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df2.start_station_id.count().compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df2.to_parquet('/bigdata/citibike_locid_cleaned.parquet', has_nulls=True, compression=\"SNAPPY\",\n",
    "              object_encoding='json')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df2 = spark.read.parquet('/bigdata/citibike_locid_cleaned.parquet')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Help on method parquet in module pyspark.sql.readwriter:\n",
      "\n",
      "parquet(path, mode=None, partitionBy=None, compression=None) method of pyspark.sql.readwriter.DataFrameWriter instance\n",
      "    Saves the content of the :class:`DataFrame` in Parquet format at the specified path.\n",
      "    \n",
      "    :param path: the path in any Hadoop supported file system\n",
      "    :param mode: specifies the behavior of the save operation when data already exists.\n",
      "    \n",
      "        * ``append``: Append contents of this :class:`DataFrame` to existing data.\n",
      "        * ``overwrite``: Overwrite existing data.\n",
      "        * ``ignore``: Silently ignore this operation if data already exists.\n",
      "        * ``error`` (default case): Throw an exception if data already exists.\n",
      "    :param partitionBy: names of partitioning columns\n",
      "    :param compression: compression codec to use when saving to file. This can be one of the\n",
      "                        known case-insensitive shorten names (none, snappy, gzip, and lzo).\n",
      "                        This will override ``spark.sql.parquet.compression.codec``. If None\n",
      "                        is set, it uses the value specified in\n",
      "                        ``spark.sql.parquet.compression.codec``.\n",
      "    \n",
      "    >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))\n",
      "    \n",
      "    .. versionadded:: 1.4\n",
      "\n"
     ]
    }
   ],
   "source": [
    "help(df2.write.parquet)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df2.write.parquet('/bigdata/citibike_locid_cleaned_partitioned.parquet', \n",
    "                  mode='overwrite', partitionBy='start_taxizone_id',\n",
    "                  compression='SNAPPY'\n",
    "                 )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df2 = dd.read_parquet('/bigdata/citibike_locid_cleaned.parquet')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/shekhar/anaconda3/lib/python3.5/site-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment.\n",
      "  warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')\n",
      "/home/shekhar/anaconda3/lib/python3.5/site-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment.\n",
      "  warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')\n"
     ]
    }
   ],
   "source": [
    "%matplotlib inline\n",
    "import matplotlib.pyplot as plt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "zz =df2.start_taxizone_id.unique().values.compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1.9762625833649862e-323,\n",
       " 5.9287877500949585e-323,\n",
       " 6.4228533959362051e-323,\n",
       " 8.3991159793011913e-323,\n",
       " 1.1857575500189917e-322,\n",
       " 1.2351641146031164e-322,\n",
       " 1.6304166312761136e-322,\n",
       " 1.6798231958602383e-322,\n",
       " 1.8280428896126122e-322,\n",
       " 1.9762625833649862e-322,\n",
       " 2.1244822771173601e-322,\n",
       " 2.2232954062856094e-322,\n",
       " 2.3715151000379834e-322,\n",
       " 2.4209216646221081e-322,\n",
       " 2.4703282292062327e-322,\n",
       " 2.569141358374482e-322,\n",
       " 2.6679544875427313e-322,\n",
       " 3.0138004396316039e-322,\n",
       " 3.2114266979681025e-322,\n",
       " 3.2608332625522272e-322,\n",
       " 3.3596463917204765e-322,\n",
       " 3.7054923438093491e-322,\n",
       " 3.9031186021458477e-322,\n",
       " 3.9525251667299724e-322,\n",
       " 4.2983711188188449e-322,\n",
       " 4.3477776834029696e-322,\n",
       " 4.4465908125712189e-322,\n",
       " 4.7924367646600915e-322,\n",
       " 4.9406564584124654e-322,\n",
       " 5.0888761521648394e-322,\n",
       " 5.2370958459172134e-322,\n",
       " 5.286502410501338e-322,\n",
       " 5.5335352334219613e-322,\n",
       " 5.5829417980060859e-322,\n",
       " 5.6323483625902106e-322,\n",
       " 6.1758205730155818e-322,\n",
       " 6.7686993480250777e-322,\n",
       " 6.9169190417774516e-322,\n",
       " 6.9663256063615763e-322,\n",
       " 7.0157321709457009e-322,\n",
       " 7.0651387355298256e-322,\n",
       " 7.1145453001139502e-322,\n",
       " 7.1639518646980749e-322,\n",
       " 7.2133584292821995e-322,\n",
       " 7.3121715584504489e-322,\n",
       " 7.4603912522028228e-322,\n",
       " 7.8062372042916954e-322,\n",
       " 7.9544568980440694e-322,\n",
       " 8.003863462628194e-322,\n",
       " 8.0532700272123187e-322,\n",
       " 8.1026765917964433e-322,\n",
       " 8.3991159793011913e-322,\n",
       " 8.9425881897265624e-322,\n",
       " 9.1896210126471857e-322,\n",
       " 9.3378407063995597e-322,\n",
       " 9.3872472709836843e-322,\n",
       " 9.5354669647360583e-322,\n",
       " 9.6342800939043076e-322,\n",
       " 1.0325971998082053e-321,\n",
       " 1.0424785127250302e-321,\n",
       " 1.072122451475505e-321,\n",
       " 1.1067070466843923e-321,\n",
       " 1.1116477031428047e-321,\n",
       " 1.1165883596012172e-321,\n",
       " 1.1264696725180421e-321,\n",
       " 1.1314103289764546e-321,\n",
       " 1.1363509854348671e-321,\n",
       " 1.1412916418932795e-321,\n",
       " 1.146232298351692e-321,\n",
       " 1.1511729548101044e-321,\n",
       " 1.1561136112685169e-321,\n",
       " 1.1659949241853418e-321,\n",
       " 1.1709355806437543e-321,\n",
       " 1.1758762371021668e-321,\n",
       " 1.1808168935605792e-321,\n",
       " 1.2154014887694665e-321,\n",
       " 1.2302234581447039e-321,\n",
       " 1.2598673968951787e-321,\n",
       " 1.2648080533535912e-321,\n",
       " 1.2895113356456535e-321,\n",
       " 1.2944519921040659e-321,\n",
       " 1.2993926485624784e-321,\n",
       " 4.0,\n",
       " 12.0,\n",
       " 13.0,\n",
       " 17.0,\n",
       " 24.0,\n",
       " 25.0,\n",
       " 33.0,\n",
       " 34.0,\n",
       " 37.0,\n",
       " 40.0,\n",
       " 43.0,\n",
       " 45.0,\n",
       " 48.0,\n",
       " 49.0,\n",
       " 50.0,\n",
       " 52.0,\n",
       " 54.0,\n",
       " 61.0,\n",
       " 65.0,\n",
       " 66.0,\n",
       " 68.0,\n",
       " 75.0,\n",
       " 79.0,\n",
       " 80.0,\n",
       " 87.0,\n",
       " 88.0,\n",
       " 90.0,\n",
       " 97.0,\n",
       " 100.0,\n",
       " 103.0,\n",
       " 106.0,\n",
       " 107.0,\n",
       " 112.0,\n",
       " 113.0,\n",
       " 114.0,\n",
       " 125.0,\n",
       " 137.0,\n",
       " 140.0,\n",
       " 141.0,\n",
       " 142.0,\n",
       " 143.0,\n",
       " 144.0,\n",
       " 145.0,\n",
       " 146.0,\n",
       " 148.0,\n",
       " 151.0,\n",
       " 158.0,\n",
       " 161.0,\n",
       " 162.0,\n",
       " 163.0,\n",
       " 164.0,\n",
       " 170.0,\n",
       " 181.0,\n",
       " 186.0,\n",
       " 189.0,\n",
       " 190.0,\n",
       " 193.0,\n",
       " 195.0,\n",
       " 209.0,\n",
       " 211.0,\n",
       " 217.0,\n",
       " 224.0,\n",
       " 225.0,\n",
       " 226.0,\n",
       " 228.0,\n",
       " 229.0,\n",
       " 230.0,\n",
       " 231.0,\n",
       " 232.0,\n",
       " 233.0,\n",
       " 234.0,\n",
       " 236.0,\n",
       " 237.0,\n",
       " 238.0,\n",
       " 239.0,\n",
       " 246.0,\n",
       " 249.0,\n",
       " 255.0,\n",
       " 256.0,\n",
       " 261.0,\n",
       " 262.0,\n",
       " 263.0]"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sorted(zz)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
